package com.tdd.consumer;

import com.tdd.common.JSONSerializer;
import com.tdd.common.RpcEncoder;
import com.tdd.common.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author tudedong
 * @description 消费者
 * @date 2020-06-06 19:11:47
 */
public class NettyClient{

    private static final String BASE_SERVICE = "/zookeeper";
    private static final String SERVICE_NAME = "/server";

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static Bootstrap bootstrap;

    /**
     * 1.创建线程池对象，用来处理我们自定义的事件
     * 创建一个固定大小的线程池
     */
    private static ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    /**
     * 2.声明一个自定义事件处理器
     */
    private static UserClientHandler userClientHandler;

    /**
     * 3.编写方法，初始化客户端（创建连接池 bootstrap 设置bootstrap 连接服务器）
     */
    public static void initClient(){
        //初始化 UserClientHandler
        userClientHandler = new UserClientHandler();

        //创建连接池对像
        EventLoopGroup group = new NioEventLoopGroup();

        //创建客户端引导对象
        bootstrap = new Bootstrap();

        //设置bootstrap
        bootstrap.group(group)
                //设置通道为nio
                .channel(NioSocketChannel.class)
                //设置请求协议为TCP
                .option(ChannelOption.TCP_NODELAY,true)
                //监听channel并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //获取ChannelPipeline
                        ChannelPipeline channelPipeline = socketChannel.pipeline();
                        //设置编码
                        //有客户端向服务端写数据，添加编码器
                        channelPipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        //接收服务端返回数据“success"，添加解码器
                        channelPipeline.addLast(new StringDecoder());
                        //添加自定义事件处理器
                        channelPipeline.addLast(userClientHandler);
                    }
                });
        //连接服务器
        ZooKeeper zooKeeper = connectServer();
        if(zooKeeper != null){
            watchNode(zooKeeper);
        }

    }

    /**
     * 获取服务端地址列表
     * @param zooKeeper
     */
    private static void watchNode(final ZooKeeper zooKeeper) {

        try {
            List<String> nodeList = zooKeeper.getChildren(BASE_SERVICE+SERVICE_NAME, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
                        //发生子节点变化时再次调用此方法更新服务地址
                        System.out.println("注意：服务端节点发生变化，，，");
                        watchNode(zooKeeper);
                    }
                }
            });

            List<String> dataList = new ArrayList<String>();
            for(String node : nodeList) {
                byte[] bytes = zooKeeper.getData(BASE_SERVICE+SERVICE_NAME +"/"+ node, true, null);
                dataList.add(new String(bytes));
            }

            if(dataList != null && dataList.size()>0){

                for(String serviceAddress:dataList){
                    if(serviceAddress != null){
                        String[] addresses = serviceAddress.split(":");
                        if(addresses != null && addresses.length==2){
                            String host = addresses[0];
                            int port = Integer.valueOf(addresses[1]);
                            bootstrap.connect(host,port).sync();
                            System.out.println("客户端连接["+host+":"+port+"]成功！");
                        }
                    }
                }
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    /**
     * 连接zookeeper服务
     * @return
     */
    private static ZooKeeper connectServer() {
        ZooKeeper zooKeeper = null;
        try {
            zooKeeper = new ZooKeeper("47.101.41.95:2181", 50000, new Watcher() {
                public void process(WatchedEvent watchedEvent) {

                    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }


    /**
     * 4.编写方法，使用jdk的动态代理创建对象
     * serviceClass 接口类型,根据哪个接口生成子类代理对象;   providerParam :  "UserService#sayHello#"
     * @param serviceClass
     * @return
     */
    public static Object createProxy(Class<?> serviceClass){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serviceClass}, new InvocationHandler() {
                    public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
                        //初始化客户端cliet
                        if(userClientHandler == null){
                            initClient();
                        }
                        //给UserClientHandler 设置请求对象
                        RpcRequest rpcRequest = new RpcRequest();
                        rpcRequest.setRequestId(UUID.randomUUID().toString());
                        rpcRequest.setClassName(method.getDeclaringClass().getName());
                        rpcRequest.setMethodName(method.getName());
                        rpcRequest.setParameterTypes(method.getParameterTypes());
                        rpcRequest.setParameters(objects);
                        System.out.println("请求内容："+rpcRequest);
                        //将请求对象客户端处理器
                        userClientHandler.setRpcRequest(rpcRequest);

                        //使用线程池,开启一个线程处理处理call() 写操作,并返回结果
                        Object result = executorService.submit(userClientHandler).get();
                        //return 结果
                        return result;
                    }
                });
    }


}
